{
  "cells": [
    {
      "cell_type": "code",
      "execution_count": 39,
      "metadata": {
        "id": "CbCjxaCTpTI7"
      },
      "outputs": [],
      "source": [
        "# @title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the \"License\")\n",
        "\n",
        "# Licensed to the Apache Software Foundation (ASF) under one\n",
        "# or more contributor license agreements. See the NOTICE file\n",
        "# distributed with this work for additional information\n",
        "# regarding copyright ownership. The ASF licenses this file\n",
        "# to you under the Apache License, Version 2.0 (the\n",
        "# \"License\"); you may not use this file except in compliance\n",
        "# with the License. You may obtain a copy of the License at\n",
        "#\n",
        "#   http://www.apache.org/licenses/LICENSE-2.0\n",
        "#\n",
        "# Unless required by applicable law or agreed to in writing,\n",
        "# software distributed under the License is distributed on an\n",
        "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
        "# KIND, either express or implied. See the License for the\n",
        "# specific language governing permissions and limitations\n",
        "# under the License"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "MQzPKKsd-LvV"
      },
      "source": [
        "# Natural Language Processing using Streaming Beam Pipeline\n",
        "\n",
        "<table align=\"left\">\n",
        "  <td>\n",
        "    <a target=\"_blank\" href=\"https://colab.sandbox.google.com/github/apache/beam/blob/master/examples/notebooks/beam-ml/nlp_tensorflow_streaming.ipynb\"><img src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/colab_32px.png\" />Run in Google Colab</a>\n",
        "  </td>\n",
        "  <td>\n",
        "    <a target=\"_blank\" href=\"https://github.com/apache/beam/blob/master/examples/notebooks/beam-ml/nlp_tensorflow_streaming.ipynb\"><img src=\"https://raw.githubusercontent.com/google/or-tools/main/tools/github_32px.png\" />View source on GitHub</a>\n",
        "  </td>\n",
        "</table>\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "m0xDYq_X-M18"
      },
      "source": [
        "Natural Language Processing or NLP is a field of Artifical Intelligence that enables computers to interpret and understand human language. It involves multiple steps such as applying various preprocessing functions, getting predictions from a model, storing the predictions in a useful format, etc.\n",
        "Sentiment Analysis is a popular use case of NLP, which allows computers to analyze the sentiment of a text. This notebook demonstrates the use of streaming pipelines in NLP.\n",
        "* Extracts comments using [Youtube API](https://developers.google.com/youtube/v3) and publishing them to Pub/Sub\n",
        "* Trains a TensorFlow model to predict the sentiment of text\n",
        "* Stores the model in Google Cloud and creates a model handler\n",
        "* Builds a Beam pipeline to:\n",
        " 1. Read data from Pub/Sub\n",
        " 2. Create a [PCollection](https://beam.apache.org/documentation/programming-guide/#pcollections) of input text\n",
        " 3. Perform preprocessing [transforms](https://beam.apache.org/documentation/programming-guide/#transforms)\n",
        " 4. RunInference to get predictions from the previously trained model\n",
        " 5. Store the results\n",
        "\n",
        "For more information on using Apache Beam for machine learning, have a look at [AI/ML Pipelines using Beam](https://beam.apache.org/documentation/ml/overview/)."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "DH64zgK17BlJ"
      },
      "source": [
        "## Installing necessary libraries"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 40,
      "metadata": {
        "id": "3CflkK6F2Zot"
      },
      "outputs": [],
      "source": [
        "!pip install apache-beam[interactive,gcp] --quiet\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "2eRsXCuq7Fv-"
      },
      "source": [
        "## Importing libraries\n",
        "\n",
        "Here's a brief overview of the libraries we have imported and what they do:\n",
        "* **NumPy**: It provides support for multi-dimensional arrays, along with many mathematical functions to operate on these arrays efficiently.\n",
        "* **Pandas**: It allows us to work efficiently with structured or tabular data. Here we have used pandas to import a dataset from a csv file and manipulate it.\n",
        "* **TextBlob**: It is a library for processing textual data and common NLP tasks. Here we have used it to analyze comments and find their sentiment polarity.\n",
        "* **Apache Beam**: It is used to build and execute data processing pipelines.\n",
        "* **RunInference**: It uses a pretrained model to predict results for new unseen data.\n",
        "* **TFModelHandlerNumpy**: It is used to manage trained TensorFlow models that take NumPy arrays as input."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 41,
      "metadata": {
        "id": "Zj50mA55ULJ5"
      },
      "outputs": [],
      "source": [
        "import numpy as np\n",
        "import pandas as pd\n",
        "import tensorflow as tf\n",
        "\n",
        "from textblob import TextBlob\n",
        "\n",
        "import apache_beam as beam\n",
        "from apache_beam.ml.inference.base import RunInference\n",
        "from apache_beam.ml.inference.tensorflow_inference import TFModelHandlerNumpy\n",
        "from apache_beam.options import pipeline_options\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "BeyZU5fa6b1C"
      },
      "source": [
        "## Sentiment Analysis\n",
        "\n",
        "Sentiment analysis is an NLP technique used to determine the sentiment or emotion expressed in a piece of text. The goal of sentiment analysis is to identify whether the text expresses a positive, negative, or neutral sentiment towards a particular subject or topic.\n",
        "\n",
        "Our goal is to build a streaming pipeline that ultimately tells us the sentiment of YouTube comments. For that, we need to have a pretrained model that can predict the sentiment of text."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "pEnsIzu-3o_-"
      },
      "source": [
        "## Training a model with labeled youtube comments dataset\n",
        "The dataset can be found on [Kaggle](https://www.kaggle.com/datasets/datasnaek/youtube?select=UScomments.csv). It contains various statistics for comments on trending videos on YouTube. Since our goal is to perform a sentiment analysis on comments, we only have to consider the text.\n",
        "\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "uJMnkxSnByeL"
      },
      "source": [
        "#### Reading the data from a csv file\n",
        "Pandas allows us to load data from a CSV file and convert it into a DataFrame, which makes it easy to perform data analysis and manipulation."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 42,
      "metadata": {
        "id": "WPzS1qRV2ft1"
      },
      "outputs": [],
      "source": [
        "comm = pd.read_csv('UScomments.csv',encoding='utf8',nrows = 1000, error_bad_lines=False)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "W1veygZRCTA0"
      },
      "source": [
        "The dataset has no labels, so we have used [TextBlob](https://textblob.readthedocs.io/en/dev/) to assign the appropriate labels by finding sentiment polarity. Polarity is a number between -1 to 1, which depicts the sentiment of a text. -1 represents most negative and 1 represents most positive sentiment."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 43,
      "metadata": {
        "id": "h7tsSISv23F-"
      },
      "outputs": [],
      "source": [
        "pol=[]\n",
        "for i in comm.comment_text.values:\n",
        "    try:\n",
        "        analysis =TextBlob(i)\n",
        "        pol.append(analysis.sentiment.polarity)\n",
        "\n",
        "    except:\n",
        "        pol.append(0)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "bLTIrypMCVmf"
      },
      "source": [
        "We need to convert the continuous numerical values of sentiment polarity into categorical values. We will add a new column 'pol' to the DataFrame which contains the categorical labels.\n",
        "* pol = 0 means positive comment (Sentiment polarity should be 0 or more)\n",
        "* pol = 1 means negative comment (Sentiment polarity should be less than 0)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 44,
      "metadata": {
        "id": "LrKXREKh284D"
      },
      "outputs": [],
      "source": [
        "comm['pol']=pol\n",
        "comm['pol'][comm.pol >= 0]= 0 #positive\n",
        "comm['pol'][comm.pol < 0]= 1 #negative"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "9eoFlczACsRR"
      },
      "source": [
        "Next, we can drop unnecessary columns from the dataset"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 45,
      "metadata": {
        "id": "neZAKUkk2_Kp"
      },
      "outputs": [],
      "source": [
        "comm = comm.drop(['video_id','likes','replies'],axis=1)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "8smzuTjUC0yE"
      },
      "source": [
        "### Preprocessing\n",
        "\n",
        "Preprocessing refers to the series of steps taken to clean, transform, and prepare raw text data. This preprocessed data can easily be fed into our ML framework and provide better results."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 46,
      "metadata": {
        "id": "fOG1MWhsIQW0"
      },
      "outputs": [],
      "source": [
        "#Dropping null values\n",
        "comm = comm.dropna()"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 47,
      "metadata": {
        "id": "EJ1lxVNP3vIz"
      },
      "outputs": [],
      "source": [
        "#Removing unnecessary characters\n",
        "def remove_symbols(text):\n",
        "    return text.replace(\"[^a-zA-Z#]\", \" \")\n",
        "comm['comment_text'] = comm['comment_text'].map(remove_symbols)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 48,
      "metadata": {
        "id": "cS9hhcrG4EVP"
      },
      "outputs": [],
      "source": [
        "#Removing words of length 3 or less\n",
        "def remove_short_words(text):\n",
        "    return ' '.join([str(w) for w in text.split() if len(str(w))>3])\n",
        "comm['comment_text'] = comm['comment_text'].map(remove_short_words)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 49,
      "metadata": {
        "id": "wopFqXpN4aBL"
      },
      "outputs": [],
      "source": [
        "#Converting to lowercase\n",
        "def lower_case(text):\n",
        "    return text.lower()\n",
        "comm['comment_text'] = comm['comment_text'].map(lower_case)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "1GwE_Uufdrq9"
      },
      "source": [
        "Next, we will divide our dataset into 2 parts:\n",
        "* X = Array of string comments\n",
        "* Y = Polarity Category (0 or 1)\n",
        "\n",
        "Here X is the unlabeled data which we will use for training and testing our model, and Y contains the corresponding labels."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 50,
      "metadata": {
        "id": "i3zX3oLs5jU_"
      },
      "outputs": [],
      "source": [
        "X = np.array(comm['comment_text'])\n",
        "Y = np.array(comm['pol'])"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "NRAyKOXHfEpU"
      },
      "source": [
        "Now we need to split the data into training and testing splits. The train split will be used for training the model, and the test split will be used to check it's performance."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 51,
      "metadata": {
        "id": "2oI-gFhs7RMf"
      },
      "outputs": [],
      "source": [
        "from sklearn.model_selection import train_test_split\n",
        "X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=0.2, random_state=0)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "3mgtFKPHJqHm"
      },
      "source": [
        "TensorFlow's [Tokenizer](https://www.tensorflow.org/api_docs/python/tf/keras/preprocessing/text/Tokenizer) is used to convert text into an array of numbers that represent it based on the frequency of each word. This is done because an ML model can't process text directly, it can only process vectors of numbers. The [fit_on_texts](https://www.tensorflow.org/api_docs/python/tf/keras/preprocessing/text/Tokenizer#fit_on_texts) method updates the vocabulary of the tokenizer based on the words present in the data passed to it."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 52,
      "metadata": {
        "id": "Oi210fpELrnf"
      },
      "outputs": [],
      "source": [
        "tokenizer = tf.keras.preprocessing.text.Tokenizer(num_words=10000, oov_token='<UNK>')\n",
        "tokenizer.fit_on_texts(comm['comment_text'])"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "8p39hoJKM0wZ"
      },
      "source": [
        "Here we have defined a function that takes a tokenizer and array of strings as input, and returns an array of tokenized strings.\n",
        "* [texts_to_sequences](https://www.tensorflow.org/api_docs/python/tf/keras/preprocessing/text/Tokenizer#texts_to_sequences): Converts text to a sequence of numbers, or tokens.\n",
        "* [pad_sequences](https://www.tensorflow.org/api_docs/python/tf/keras/utils/pad_sequences): Transforms tokens of different sizes to the same size."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 53,
      "metadata": {
        "id": "XxtYDlejLrrA"
      },
      "outputs": [],
      "source": [
        "maxlen=100\n",
        "def get_sequences(comments):\n",
        "    sequences = tokenizer.texts_to_sequences(comments)\n",
        "    padded = tf.keras.utils.pad_sequences(sequences, truncating = 'post', padding='post', maxlen=maxlen)\n",
        "    return padded"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "DF2qMnof37J3"
      },
      "source": [
        "Using the function defined aboved, now we will tokenize the X_train and X_test datasets."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 54,
      "metadata": {
        "id": "WLZd3QhHMyZQ"
      },
      "outputs": [],
      "source": [
        "padded_seq_train = get_sequences(X_train)\n",
        "padded_seq_test = get_sequences(X_test)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "1mT_Js8pUbs-"
      },
      "source": [
        "### Building a simple TensorFlow model to predict polarity of comments\n",
        "\n",
        "Now that we have our preprocessed training and testing data, we need to build a model. This model will take input as strings and predict which category (positive or negative) the string belongs to. Here is a brief description of the layers we have used to build this model:\n",
        "* **Embedding Layer**: It converts tokens into dense vector representations. This allows a neural network to capture semantic relationships between words and generalize better on unseen data.\n",
        "* **Bidirectional Layer**: It enhances the information flow by processing input sequences in both forward and backward directions. It is used for sequential data like natural language.\n",
        "* **Dense Layer**: It connects every neuron in the previous layer to the neurons in the next layer. The comments can either be positive or negative, so there are two categories. Thus, the Dense layer provides two outputs at the end, both of which correspond to the probabibility of the text belonging to that category."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 55,
      "metadata": {
        "id": "5HuyQgQBNzaB"
      },
      "outputs": [],
      "source": [
        "model = tf.keras.models.Sequential([\n",
        "tf.keras.layers.Embedding(10000,16,input_length=maxlen),\n",
        "tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(20, return_sequences=True)),\n",
        "tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(10)),\n",
        "tf.keras.layers.Dense(2, activation='softmax')\n",
        "])\n",
        "model.compile(\n",
        "     loss='sparse_categorical_crossentropy',\n",
        "     optimizer='adam',\n",
        "     metrics=['accuracy']\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "u1cOz6wVXKwG"
      },
      "source": [
        "Next, we will create a checkpoint to save the model with the best validation accuracy."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 56,
      "metadata": {
        "id": "XrosBmBqX0yO"
      },
      "outputs": [],
      "source": [
        "checkpoint_acc = tf.keras.callbacks.ModelCheckpoint(\"weights_acc\", monitor=\"val_accuracy\",\n",
        "save_best_only=True)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "O-lZlCWQb6ut"
      },
      "source": [
        "Now we will train our model by fitting it with the training data and using testing data for validation."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 57,
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "aQwWsnwEPY82",
        "outputId": "1d0e5578-04b7-42e6-8b17-b53f18e6f8c9"
      },
      "outputs": [
        {
          "name": "stdout",
          "output_type": "stream",
          "text": [
            "Epoch 1/10\n",
            "25/25 [==============================] - ETA: 0s - loss: 0.5931 - accuracy: 0.7650"
          ]
        },
        {
          "name": "stderr",
          "output_type": "stream",
          "text": [
            "WARNING:absl:Found untraced functions such as _update_step_xla, lstm_cell_7_layer_call_fn, lstm_cell_7_layer_call_and_return_conditional_losses, lstm_cell_8_layer_call_fn, lstm_cell_8_layer_call_and_return_conditional_losses while saving (showing 5 of 9). These functions will not be directly callable after loading.\n"
          ]
        },
        {
          "name": "stdout",
          "output_type": "stream",
          "text": [
            "\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\b\r25/25 [==============================] - 60s 2s/step - loss: 0.5931 - accuracy: 0.7650 - val_loss: 0.3625 - val_accuracy: 0.8900\n",
            "Epoch 2/10\n",
            "25/25 [==============================] - 4s 169ms/step - loss: 0.4663 - accuracy: 0.8163 - val_loss: 0.3644 - val_accuracy: 0.8900\n",
            "Epoch 3/10\n",
            "25/25 [==============================] - 3s 120ms/step - loss: 0.4550 - accuracy: 0.8163 - val_loss: 0.3663 - val_accuracy: 0.8900\n",
            "Epoch 4/10\n",
            "25/25 [==============================] - 3s 112ms/step - loss: 0.4268 - accuracy: 0.8163 - val_loss: 0.3683 - val_accuracy: 0.8900\n",
            "Epoch 5/10\n",
            "25/25 [==============================] - 3s 111ms/step - loss: 0.3700 - accuracy: 0.8238 - val_loss: 0.3266 - val_accuracy: 0.8900\n",
            "Epoch 6/10\n",
            "25/25 [==============================] - 4s 179ms/step - loss: 0.2280 - accuracy: 0.9237 - val_loss: 0.3199 - val_accuracy: 0.8450\n",
            "Epoch 7/10\n",
            "25/25 [==============================] - 4s 152ms/step - loss: 0.1034 - accuracy: 0.9825 - val_loss: 0.4166 - val_accuracy: 0.8000\n",
            "Epoch 8/10\n",
            "25/25 [==============================] - 3s 108ms/step - loss: 0.0597 - accuracy: 0.9900 - val_loss: 0.4530 - val_accuracy: 0.8500\n",
            "Epoch 9/10\n",
            "25/25 [==============================] - 3s 114ms/step - loss: 0.0311 - accuracy: 0.9962 - val_loss: 0.5967 - val_accuracy: 0.8000\n",
            "Epoch 10/10\n",
            "25/25 [==============================] - 3s 106ms/step - loss: 0.0142 - accuracy: 0.9987 - val_loss: 0.8104 - val_accuracy: 0.7800\n"
          ]
        },
        {
          "data": {
            "text/plain": [
              "<keras.callbacks.History at 0x7eda55b3a4d0>"
            ]
          },
          "execution_count": 57,
          "metadata": {},
          "output_type": "execute_result"
        }
      ],
      "source": [
        "model.fit(\n",
        "     padded_seq_train, y_train,\n",
        "     validation_data=(padded_seq_test, y_test),\n",
        "     epochs=10,\n",
        "     callbacks=checkpoint_acc\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "ow06ihbv7JkW"
      },
      "source": [
        "## Authenticating for Google cloud\n",
        "\n",
        "We need to authenticate our google account for the following:\n",
        "* Saving the model in Google cloud\n",
        "* Publishing messages in Pub/Sub\n",
        "* Accessing previously published messages using a subscription"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 58,
      "metadata": {
        "id": "POlTUDj8D0_L"
      },
      "outputs": [],
      "source": [
        "from google.colab import auth\n",
        "auth.authenticate_user()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "TzpdiU39gA9v"
      },
      "source": [
        "## Saving the model in Google cloud\n",
        "\n",
        "We will save the model in Google Cloud so that we can easily load it using RunInference. Then it can be used to predict results for the input data in our Beam pipeline."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 59,
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "2fpLQvm_ZEze",
        "outputId": "35b6d356-c86d-43e5-e882-ddef6bc6d214"
      },
      "outputs": [
        {
          "data": {
            "text/plain": [
              "<tensorflow.python.checkpoint.checkpoint.CheckpointLoadStatus at 0x7eda3c5c42b0>"
            ]
          },
          "execution_count": 59,
          "metadata": {},
          "output_type": "execute_result"
        }
      ],
      "source": [
        "model.load_weights('weights_acc')"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 60,
      "metadata": {
        "id": "-3H9m7NrZTs9"
      },
      "outputs": [],
      "source": [
        "save_model_dir = '' # Add the link to your GCS bucket here"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 61,
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "cS5wxvxTZgcp",
        "outputId": "999c6366-d887-49aa-8e40-0ebb995006d6"
      },
      "outputs": [
        {
          "name": "stderr",
          "output_type": "stream",
          "text": [
            "WARNING:absl:Found untraced functions such as _update_step_xla, lstm_cell_7_layer_call_fn, lstm_cell_7_layer_call_and_return_conditional_losses, lstm_cell_8_layer_call_fn, lstm_cell_8_layer_call_and_return_conditional_losses while saving (showing 5 of 9). These functions will not be directly callable after loading.\n"
          ]
        }
      ],
      "source": [
        "model.save(save_model_dir)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "a-lXoeTqo7UG"
      },
      "source": [
        "## Creating a model handler\n",
        "\n",
        "A model handler is used to save, load and manage trained ML models. Here we used TFModelHandlerNumpy as our input text is in the form of numpy arrays."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 62,
      "metadata": {
        "id": "mTR2SonmZiBQ"
      },
      "outputs": [],
      "source": [
        "model_handler = TFModelHandlerNumpy(save_model_dir)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "94aXKrR0Cnvw"
      },
      "source": [
        "## Understanding Pub/Sub\n",
        "\n",
        "Google Cloud [Pub/Sub](https://cloud.google.com/pubsub/docs/overview) is a messaging service provided by Google Cloud Platform (GCP). It is designed to enable scalable, reliable, and real-time messaging between independent applications. Pub/Sub follows the publish-subscribe model, where messages are published by senders (publishers) to a topic, and then delivered to multiple receivers (subscribers) who have expressed interest in that topic. <br> <br>\n",
        "Pub/Sub acts as an unbounded source, as it's constantly receiving and sending messages in real time. In such cases, we need to build a [Streaming Pipeline](https://beam.apache.org/documentation/sdks/python-streaming/)."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "QMPdmWS57Oww"
      },
      "source": [
        "## Creating a publisher for a pubsub topic in Google Cloud Console\n",
        "A publisher is a component that allows us to create and send messages to Google Cloud Pub/Sub. Learn more about publishing and received messages from Pub/Sub [here](https://cloud.google.com/pubsub/docs/publish-receive-messages-client-library)."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 63,
      "metadata": {
        "id": "2UyYvEcz9IAx"
      },
      "outputs": [],
      "source": [
        "import os\n",
        "from google.cloud import pubsub_v1\n",
        "# Add your project ID here\n",
        "PROJECT_ID = '<PROJECT_ID>' # @param {type:'string'}\n",
        "# Add your topic name here\n",
        "TOPIC = '<TOPIC>' # @param {type:'string'}\n",
        "publisher = pubsub_v1.PublisherClient()\n",
        "topic_name = 'projects/{project_id}/topics/{topic}'.format(\n",
        "    project_id = PROJECT_ID,\n",
        "    topic = TOPIC,\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "2wAnY8zz7cIu"
      },
      "source": [
        "## Extracting and sending comments to Pub/Sub\n",
        "YouTube API provides an interface for accessing YouTube data. First, we need to enable YouTube API on the Google Cloud Console project. After that, we need to create a credential, which will further provide an API key. This API key, along with a video ID can be used to access data of that YouTube video. The Publisher created earlier is used to publish each comment to Pub/Sub.\n",
        "\n",
        "See examples of using the YouTube API [here](https://developers.google.com/youtube/v3/code_samples/code_snippets)."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 64,
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "frKiefXGHXve",
        "outputId": "565d73f5-ec69-498b-91fc-211ce4c495e7"
      },
      "outputs": [
        {
          "name": "stdout",
          "output_type": "stream",
          "text": [
            "Can’t wait to watch you guys grow . Harmonies are on point and the oversized early 90’s blazers are a great touch.\n",
            "Amazing performance! Such an inspiring group ❤\n",
            "Love the vibe\n",
            "Your telling me this has less than 100 views????  Unreal\n",
            "I&#39;m happy that I lived long enough to see and hear music that tells the truth that millions of men and boys live every day! WELL DONE!\n",
            "Love the unity of sound\n"
          ]
        }
      ],
      "source": [
        "from googleapiclient.discovery import build\n",
        "\n",
        "api_key = '' #Add your API key here\n",
        "\n",
        "def video_comments(video_id):\n",
        "    # Creating youtube resource object\n",
        "    youtube = build('youtube', 'v3',\n",
        "                    developerKey=api_key)\n",
        "\n",
        "    # Retrieve youtube video results\n",
        "    video_response=youtube.commentThreads().list(\n",
        "    part='snippet,replies',\n",
        "    videoId=video_id\n",
        "    ).execute()\n",
        "\n",
        "    # Iterate video response\n",
        "    while video_response:\n",
        "\n",
        "        # extracting required info from each object\n",
        "        for item in video_response['items']:\n",
        "\n",
        "            # Extracting comments\n",
        "            comment = item['snippet']['topLevelComment']['snippet']['textDisplay']\n",
        "\n",
        "            # Print comment\n",
        "            print(comment, end = '\\n')\n",
        "            data = comment.encode(\"utf-8\")\n",
        "\n",
        "            # Publishing the comment to Pub/Sub\n",
        "            publisher.publish(topic_name, data)\n",
        "\n",
        "        # Repeat until there are no next pages\n",
        "        if 'nextPageToken' in video_response:\n",
        "            video_response = youtube.commentThreads().list(\n",
        "                    part = 'snippet,replies',\n",
        "                    videoId = video_id\n",
        "                ).execute()\n",
        "        else:\n",
        "          return\n",
        "\n",
        "# The video ID can be extracted from the video URL, which can be represented like this\n",
        "# https://www.youtube.com/watch?v=VIDEO_ID\n",
        "# Enter here the desired video ID\n",
        "video_id = \"fCXYrAH2gQI\"\n",
        "\n",
        "# Call function\n",
        "video_comments(video_id)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "P51E6640pbw4"
      },
      "source": [
        "## Defining utility functions\n",
        "\n",
        "Below we have defined some functions for our Beam pipeline to perform the following tasks:\n",
        "* Print the messages received from Pub/Sub\n",
        "* Tokenize the strings\n",
        "* Save the predictions in a list\n",
        "\n",
        "These functions can be used in our pipeline by using [Map](https://beam.apache.org/documentation/transforms/python/elementwise/map/), which essentially calls the function on each element in the PCollection."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 65,
      "metadata": {
        "id": "kdzXM_ooco9-"
      },
      "outputs": [],
      "source": [
        "# Index 0 corresponds to positive comment while index 1 corresponds to negative comment\n",
        "labels = ['positive','negative']"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 66,
      "metadata": {
        "id": "HKbvU544Cd6v"
      },
      "outputs": [],
      "source": [
        "# Printing values\n",
        "def print_values(element):\n",
        "  print(element)\n",
        "  return element\n",
        "\n",
        "# Here along with printing, we have also returned the element.\n",
        "# This is done so that the element is passed into the next functions or transforms after printing."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 67,
      "metadata": {
        "id": "gNpyjcEwLny9"
      },
      "outputs": [],
      "source": [
        "# Tokenizing the strings\n",
        "def tokenize(element):\n",
        "    padded_seq = get_sequences([element])\n",
        "    return padded_seq[0]"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 68,
      "metadata": {
        "id": "58xSecy4Jfn8"
      },
      "outputs": [],
      "source": [
        "# Saving predictions in a list\n",
        "predictions = []\n",
        "from tensorflow.python.ops.numpy_ops import np_config\n",
        "np_config.enable_numpy_behavior()\n",
        "def save_predictions(element):\n",
        "    list_of_predictions = element.inference.tolist()\n",
        "    highest_prediction = max(list_of_predictions)\n",
        "    ans = labels[list_of_predictions.index(highest_prediction)]\n",
        "    predictions.append([list_of_predictions,ans])\n",
        "    print(ans)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "YaqDqk9w0LlA"
      },
      "source": [
        "## Building an Apache Beam Pipeline\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "OgH15nbK0Whw"
      },
      "source": [
        "We need to build a streaming pipeline that takes data from Pub/Sub. A [Runner](https://beam.apache.org/documentation/#runners) is used to execute Beam pipelines in a distributed manner. We need to use a streaming runner to run a streaming pipeline. [InteractiveRunner](https://beam.apache.org/releases/pydoc/2.10.0/apache_beam.runners.interactive.interactive_runner.html) is suitable for this and allows developing and running Beam pipelines interactively in notebooks.\n",
        "\n",
        "See more details on how to use InteractiveRunner [here](https://cloud.google.com/dataflow/docs/guides/interactive-pipeline-development)."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 69,
      "metadata": {
        "id": "Nq1N7JGR9Vwa"
      },
      "outputs": [],
      "source": [
        "# Add the path to your topic here\n",
        "TOPIC_PATH = '<TOPIC_PATH>' # @param {type:'string'}"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 70,
      "metadata": {
        "id": "5-06sxMtwyWN"
      },
      "outputs": [],
      "source": [
        "# Add the path to your subscription here\n",
        "SUBS_PATH = '<SUB_PATH>' # @param {type:'string'}"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "UliBhojEfxhq"
      },
      "source": [
        "Importing InteractiveRunner"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 71,
      "metadata": {
        "id": "Nr6lPbQMhywM"
      },
      "outputs": [],
      "source": [
        "from apache_beam.runners.interactive.interactive_runner import InteractiveRunner\n",
        "import apache_beam.runners.interactive.interactive_beam as ib"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 72,
      "metadata": {
        "id": "nuHyErBPZDVY"
      },
      "outputs": [],
      "source": [
        "ib.options.recording_duration = '2m' # This is how long Interactive Runner will listen to data from Pub/Sub\n",
        "ib.options.recording_size_limit = 1e9 # This is the recording size limit set to 1 GB\n",
        "options = pipeline_options.PipelineOptions()\n",
        "options.view_as(pipeline_options.StandardOptions).streaming = True # Streaming mode is set True"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "Wfo0UqQmz821"
      },
      "source": [
        "The pipeline performs the following tasks:\n",
        "* Reads messages from Cloud Pub Sub\n",
        "* Prints the messages\n",
        "* Performs preprocessing. We can reuse all of our previously defined preprocessing functions for training using [beam.Map](https://beam.apache.org/documentation/transforms/python/elementwise/map/).\n",
        "* RunInference on the preprocessed data\n",
        "* Prints the result and store in a list"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/",
          "height": 627
        },
        "id": "unDFnXzcLkvF",
        "outputId": "b20179e0-6bbd-407b-8752-1f282ff2c2ed"
      },
      "outputs": [
        {
          "name": "stdout",
          "output_type": "stream",
          "text": [
            "Can’t wait to watch you guys grow . Harmonies are on point and the oversized early 90’s blazers are a great touch.\n",
            "Amazing performance! Such an inspiring group ❤\n",
            "Love the vibe\n",
            "Your telling me this has less than 100 views????  Unreal\n",
            "I&#39;m happy that I lived long enough to see and hear music that tells the truth that millions of men and boys live every day! WELL DONE!\n",
            "Love the unity of sound\n",
            "positive\n",
            "positive\n",
            "positive\n",
            "positive\n",
            "positive\n",
            "positive\n"
          ]
        }
      ],
      "source": [
        "with beam.Pipeline(options=options) as p:\n",
        "    _ = (p | \"Read From Pub/Sub\" >> beam.io.ReadFromPubSub(subscription=SUBS_PATH)\n",
        "           | \"Convert to String\" >> beam.Map(lambda element: element.decode('utf-8'))\n",
        "           | \"Print\" >> beam.Map(print_values)\n",
        "           | \"Remove Symbols\" >> beam.Map(remove_symbols)\n",
        "           | \"Remove Short Words\" >> beam.Map(remove_short_words)\n",
        "           | \"Lower Case\" >> beam.Map(lower_case)\n",
        "           | \"Tokenize\" >> beam.Map(tokenize)\n",
        "           | \"RunInference\" >> RunInference(model_handler)\n",
        "           | \"Store Predictions\" >> beam.Map(save_predictions)\n",
        "        )"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "bIhqhtyvzMXR"
      },
      "source": [
        "The above pipeline is a streaming pipeline, which means it will run continuously, unless we stop it manually. This is why a keyboard interrupt can be seen here."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "vEch9GkVzp9S"
      },
      "source": [
        "Let us print the predictions made by the model and the corresponding sentiment."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "YqNoxnd4qJAD",
        "outputId": "bde9cf5b-97b6-4197-9629-41d0b53c043a"
      },
      "outputs": [
        {
          "data": {
            "text/plain": [
              "[[[0.852806806564331, 0.14719319343566895], 'positive'],\n",
              " [[0.8602035045623779, 0.13979655504226685], 'positive'],\n",
              " [[0.8670042753219604, 0.13299570977687836], 'positive'],\n",
              " [[0.8574993014335632, 0.14250065386295319], 'positive'],\n",
              " [[0.8401712775230408, 0.15982864797115326], 'positive'],\n",
              " [[0.8648154735565186, 0.13518451154232025], 'positive']]"
            ]
          },
          "execution_count": 38,
          "metadata": {},
          "output_type": "execute_result"
        }
      ],
      "source": [
        "predictions"
      ]
    }
  ],
  "metadata": {
    "colab": {
      "provenance": []
    },
    "kernelspec": {
      "display_name": "Python 3",
      "name": "python3"
    },
    "language_info": {
      "name": "python"
    }
  },
  "nbformat": 4,
  "nbformat_minor": 0
}
